package com.itcs.redis.queue;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;

/**
 * Redis队列 实现类
 * 
 * @author LiuPeng
 * @date 2018年1月11日
 * 
 */
public class RedisQueueTemplate {

	private Log logger = LogFactory.getLog(RedisQueueTemplate.class);

	private StringRedisTemplate redisTemplate;

	private static final String QUESUE_LOCK_KEY_SUFFIX = "_itcs_lock";

	public RedisQueueTemplate(StringRedisTemplate redisTemplate) {
		if (redisTemplate == null) {
			throw new RuntimeException("redisTemplate can not be null ");
		}
		this.redisTemplate = redisTemplate;
	}

	public long rPush(final String queueName, final String value) {
		return redisTemplate.execute(new RedisCallback<Long>() {
			@Override
			public Long doInRedis(RedisConnection connection) throws DataAccessException {
				return connection.rPush(redisTemplate.getStringSerializer().serialize((queueName)),
						redisTemplate.getStringSerializer().serialize(value));
			}
		});
	}

	public String lPop(final String queueName) {
		return redisTemplate.execute(new RedisCallback<String>() {
			@Override
			public String doInRedis(RedisConnection connection) throws DataAccessException {
				byte[] value = connection.lPop(redisTemplate.getStringSerializer().serialize((queueName)));
				return redisTemplate.getStringSerializer().deserialize(value);
			}
		});
	}

	public long rPush(final String queueName, final List<String> values) {
		if (values != null && values.size() > 0) {
			return redisTemplate.execute(new RedisCallback<Long>() {
				@Override
				public Long doInRedis(RedisConnection connection) throws DataAccessException {
					List<byte[]> valuesByte = new ArrayList<>();
					for (String value : values) {
						if (value != null) {
							valuesByte.add(redisTemplate.getStringSerializer().serialize(value));
						}
					}
					return connection.rPush(redisTemplate.getStringSerializer().serialize((queueName)),
							valuesByte.toArray(new byte[valuesByte.size()][]));
				}
			});
		}
		return 0L;
	}

	public List<String> lPop(final String queueName, final int length) {
		return redisTemplate.execute(new RedisCallback<List<String>>() {
			// long s = System.currentTimeMillis();
			@Override
			public List<String> doInRedis(RedisConnection connection) throws DataAccessException {
				List<String> values = new ArrayList<>();
				byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
				byte[] lockKey = redisTemplate.getStringSerializer().serialize((queueName) + QUESUE_LOCK_KEY_SUFFIX);
				try {
					while (!connection.setNX(lockKey, key)) {// 队列加锁
						try {
							Thread.sleep(10);
						} catch (Exception e) {
							logger.warn(" RedisQueueTemplate sleep Exception .....");
						}
					}
					connection.expire(lockKey, 5);// 设计锁的超时时间

					List<byte[]> valuesByte = connection.lRange(key, 0, length - 1);
					connection.lTrim(key, valuesByte.size(), -1);

					for (byte[] valueByte : valuesByte) {
						values.add(redisTemplate.getStringSerializer().deserialize(valueByte));
					}

				} finally {
					connection.del(lockKey);
				}
				// long s2 = System.currentTimeMillis();
				// if (s2 - 1000 > s) {
				// logger.warn("lPop WARN Time consuming: " + (s2 - 1000 > s));
				// }
				return values;
			}
		});
	}

	public List<String> lRange(final String queueName, final int length) {
		return redisTemplate.execute(new RedisCallback<List<String>>() {
			@Override
			public List<String> doInRedis(RedisConnection connection) throws DataAccessException {
				List<String> values = new ArrayList<>();
				byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
				List<byte[]> valuesByte = connection.lRange(key, 0, length - 1);
				for (byte[] valueByte : valuesByte) {
					values.add(redisTemplate.getStringSerializer().deserialize(valueByte));
				}
				return values;
			}
		});
	}

	public void lRem(final String queueName, final int length) {
		redisTemplate.execute(new RedisCallback<Long>() {
			@Override
			public Long doInRedis(RedisConnection connection) throws DataAccessException {
				byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
				connection.lTrim(key, length, -1);
				return 1L;
			}
		});
	}

	public long lLen(final String queueName) {
		return redisTemplate.execute(new RedisCallback<Long>() {
			@Override
			public Long doInRedis(RedisConnection connection) throws DataAccessException {
				byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
				return connection.lLen(key);
			}
		});
	}

}
